package com.chen.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.util.concurrent.SuccessCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

/**
 * 消息生产者
 */
@RestController
public class KafkaProducerController {

    //注入kafka模板
    @Autowired
    private KafkaTemplate kafkaTemplate;

    // 简单发送消息(不带回调参数)
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }

    //带回调，回调方法中监控消息是否发送成功
    @GetMapping("/kafka/callbackOne/{message}")
    public void sendMessage2(@PathVariable("message") String callbackMessage) {
        kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                System.out.println("发送消息成功：" + stringObjectSendResult.getRecordMetadata().topic() + "-"
                        + stringObjectSendResult.getRecordMetadata().partition() + "-" + stringObjectSendResult.getRecordMetadata().offset());
            }
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败："+throwable.getMessage());
            }
        });
    }

    //如果在发送消息时需要创建事务，可以使用 KafkaTemplate 的 executeInTransaction 方法来声明事务
    @GetMapping("/kafka/transaction")
    public void sendMessage3(){
        // 声明事务：后面报错消息不会发出去
        kafkaTemplate.executeInTransaction(operations -> {
            operations.send("topic1","test executeInTransaction");
            throw new RuntimeException("fail");
        });

        // 不声明事务：后面报错但前面消息已经发送成功了
        kafkaTemplate.send("topic1","test executeInTransaction");
        throw new RuntimeException("fail");


    }
}
